{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1e90b576",
   "metadata": {},
   "outputs": [],
   "source": [
    "%session_id_prefix native-delta-sql-\n",
    "%glue_version 3.0\n",
    "%idle_timeout 60\n",
    "%%configure \n",
    "{\n",
    "  \"--conf\": \"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n",
    "  \"--datalake-formats\": \"delta\"\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9a034a2e",
   "metadata": {},
   "outputs": [],
   "source": [
    "bucket_name = \"<Your S3 bucket name>\"\n",
    "bucket_prefix = \"<Your S3 bucket prefix>\"\n",
    "database_name = \"delta_sql\"\n",
    "database_prefix = f\"{bucket_prefix}/{database_name}\"\n",
    "database_location = f\"s3://{bucket_name}/{database_prefix}/\"\n",
    "table_name = \"products\"\n",
    "table_prefix = f\"{database_prefix}/{table_name}\"\n",
    "table_location = f\"s3://{bucket_name}/{table_prefix}/\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "429b7d93",
   "metadata": {},
   "source": [
    "## Clean up existing resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "47da9906",
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "\n",
    "## Delete files in S3\n",
    "s3 = boto3.resource('s3')\n",
    "bucket = s3.Bucket(bucket_name)\n",
    "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5eebe198",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "DROP TABLE IF EXISTS delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "483e0bb9",
   "metadata": {},
   "source": [
    "## Create Delta table with sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6dc864b1",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "import time\n",
    "\n",
    "ut = time.time()\n",
    "\n",
    "product = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n",
    "]\n",
    "\n",
    "df_products = spark.createDataFrame(Row(**x) for x in product)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "92a476a4",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_products.write.format(\"delta\"). \\\n",
    "  mode(\"overwrite\"). \\\n",
    "  save(table_location)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3f184165",
   "metadata": {},
   "source": [
    "## Create a Delta Lake table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6cd86cb5",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "CREATE DATABASE IF NOT EXISTS delta_sql"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "446bebbd",
   "metadata": {},
   "outputs": [],
   "source": [
    "# create table in metastore\n",
    "query = f\"\"\"\n",
    "CREATE TABLE {database_name}.{table_name}\n",
    "USING delta\n",
    "LOCATION '{table_location}'\n",
    "\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a9736c18",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "USE delta_sql"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ae6a7942",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SHOW TABLES"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "50a6e89e",
   "metadata": {},
   "source": [
    "## Read from Delta Lake table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "47777845",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql #Read table from metastore\n",
    "SELECT * FROM delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "948f6bf4",
   "metadata": {},
   "source": [
    "## Insert records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "af3b1204",
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "query=f\"\"\"INSERT INTO {database_name}.{table_name} VALUES('00006', 'Pen', 30,'Stationery',{ut}), ('00007', 'Book', 500,'Stationery',{ut})\"\"\"\n",
    "spark.sql(query)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c1b8c7a4",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "62c4d603",
   "metadata": {},
   "source": [
    "## Update records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b6d2c312",
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "query=f\"\"\"UPDATE {database_name}.{table_name} SET price=300, updated_at={ut} WHERE product_id == '00007'\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "79e66ede",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2749314e",
   "metadata": {},
   "source": [
    "## Upsert records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5dc1d8aa",
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "product_updates = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n",
    "    {'product_id': '00008', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n",
    "]\n",
    "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n",
    "\n",
    "df_product_updates.createOrReplaceTempView(\"tmp_products_updates\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "791e4cd3",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "MERGE INTO delta_sql.products AS old \\\n",
    "USING tmp_products_updates AS new \\\n",
    "ON old.product_id=new.product_id \\\n",
    "WHEN MATCHED THEN \\\n",
    "UPDATE SET \\\n",
    "    old.product_name=new.product_name, \\\n",
    "    old.price=new.price, \\\n",
    "    old.category=new.category, \\\n",
    "    old.updated_at=new.updated_at \\\n",
    "WHEN NOT MATCHED \\\n",
    "THEN INSERT (product_id, product_name, price,category,updated_at) \\\n",
    "VALUES ( \\\n",
    "    new.product_id, \\\n",
    "    new.product_name, \\\n",
    "    new.price, \\\n",
    "    new.category, \\\n",
    "    new.updated_at \\\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "002f9201",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "324a5cb1",
   "metadata": {},
   "source": [
    "## Alter DeltaLake table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2677931e",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "ALTER TABLE delta_sql.products ADD COLUMNS (CURRENCY STRING AFTER PRICE)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "caeab22e",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "UPDATE delta_sql.products SET CURRENCY =\"INR\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "203b5f1d",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a4fb1043",
   "metadata": {},
   "source": [
    "## Delete records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6133b28a",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "DELETE FROM delta_sql.products WHERE product_name == \"Pen\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "96a6924e",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "712ba1d3",
   "metadata": {},
   "source": [
    "## View History"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "076409de",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "DESCRIBE HISTORY delta_sql.products"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0c51d9c6",
   "metadata": {},
   "source": [
    "## Stop Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f0b4eed8",
   "metadata": {},
   "outputs": [],
   "source": [
    "%stop_session"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Glue PySpark",
   "language": "python",
   "name": "glue_pyspark"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
